-
Notifications
You must be signed in to change notification settings - Fork 501
[FLINK-37730][Job Manager] Expose JM exception as K8s exceptions #978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
rmetzger
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a look?
...n/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
Outdated
Show resolved
Hide resolved
Thank you! |
It'd be great to catch and turn every job exception into a k8s event, not just for terminal job failures. It'd simplify collecting historical diagnostic data before an actual crash occurs. |
@morhidi Sorry I do not understand this. I am not checking for only terminal job failures. I am checking for all the failures, when the job is not in one of the terminal states. |
...n/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
Outdated
Show resolved
Hide resolved
...ernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
Outdated
Show resolved
Hide resolved
...ernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
Outdated
Show resolved
Hide resolved
nm I miss-read it at first glance |
...n/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
Outdated
Show resolved
Hide resolved
...api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
Outdated
Show resolved
Hide resolved
helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
Outdated
Show resolved
Hide resolved
| public static boolean createIfNotExists( | ||
| KubernetesClient client, | ||
| HasMetadata target, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we call createWithAnnotationsIfNotExists() from this method to avoid code duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had thought about it but I did not do it because of the event time. In our case we had decided to set the exception time as event time, but I am not aware of how should it happen for other k8s events, so I kept them separate with the cost of duplicated code.
cc @gyfora
...ator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
Outdated
Show resolved
Hide resolved
gyfora
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking pretty good now, I added a few minor comments still
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...perator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
Outdated
Show resolved
Hide resolved
...rator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
Show resolved
Hide resolved
gyfora
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! As a future followup we could think about reducing the number of REST API calls we make to fetch exceptions.
At the moment this is done on every step but based on the job details that we get in previous steps in the observers we may be able to deduct that the job did not fail since the last time we checked so exceptions do not need to be queried.
If you could open a follow up ticket for that I think that would be nice :)
|
You need to regenerate the docs: |
|
I hit the following error while running locally: So something seems to be off with the time handling |
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
| ctx.getExceptionCacheEntry().setJobId(currentJobId); | ||
| ctx.getExceptionCacheEntry().setLastTimestamp(now.toEpochMilli()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be the max exception timestamp? It could happen that there are job exceptions between getting it in the rest api and emitting them here and those would be missed if we set a higher timestamp based on now
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
...-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
Outdated
Show resolved
Hide resolved
| String stacktrace = exception.getStacktrace(); | ||
| if (stacktrace != null && !stacktrace.isBlank()) { | ||
| String[] lines = stacktrace.split("\n"); | ||
| eventMessage.append("\n\nStacktrace (truncated):\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done

What is the purpose of the change
(For example: This pull request adds a new feature to periodically create and maintain savepoints through the
FlinkDeploymentcustom resource.)This pull requests adds a new feature to periodically check for job exceptions using the FLINK REST API for getting the exceptions and raise them as kubernetes events. This feature will be helpful for monitoring systems that want to do a post processing on the job exceptions.
This is ONLY introduced for Application mode and NOT Session mode.
Brief change log
(for example:)
SYSTEM_ADVANCEDconfig for configuring the max number of exceptions reported and max lenght for stacktrace (defaults are 5 and 10 respectively)Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Apart from the unit tests in this PR, this was tested using two simulations:
sql-test: Good running job, exception simulated by manually killing the TM
sql-test-failing: Job that has exception in the open method, repeatedly fails.
Both the exceptions were produced one at a time, and both simultaneously.
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: (yes / no)Documentation